GCS にファイルが配置されたらイベント駆動で BigQuery にデータロードするサーバレスなジョブをつくってみた

GCS にファイルが配置されたらイベント駆動で BigQuery にデータロードするサーバレスなジョブをつくってみた

Clock Icon2020.07.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

やりたいこと

  • GCS のファイルを BigQuery にロードするジョブを手軽に(サーバレスで)実装したい
  • GCS にファイルが配置されたタイミングでイベント駆動でデータロードジョブを実行したい

GCS バケットを作成

以下のチュートリアルを参考に、gcloud コマンドで GCS バケットと Cloud Functions 関数をデプロイします。

Cloud Shell を使えば、ブラウザ上でコマンドラインが実行でき、gcloud ツールもプリインストールされているので便利です。

まずはデータ連携用の GCS バケットを作成します。

gcp_da_user@cloudshell:~/load (cm-da-mikami-yuki-258308)$ gsutil mb gs://job-mikami-data-load
Creating gs://job-mikami-data-load/...

GCP 管理画面からも、バケットが作成されたことが確認できました。

なお、GCS から BigQuery へデータロードする場合、GCS バケットと BigQuery のデータセットは同じロケーションに配置する必要があります。

BigQuery にデータロードする関数を Cloud Functions にデプロイ

引き続き Cloud Shell を使って、GCS のファイルデータを BigQuery にロードする関数を Cloud Functions にデプロイします。

今回は Python でコーディングしましたが、Cloud Functions の関数は、他に Node.js や Go、Java で実装することも可能です。

以下の Python コードを main.py というファイル名で保存しました。

from google.cloud import bigquery

def load_data(data, context):
    # check content-type
    if data['contentType'] != 'text/csv':
        print('Not supported file type: {}'.format(data['contentType']))
        return
    # get file info
    bucket_name = data['bucket']
    file_name = data['name']
    uri = 'gs://{}/{}'.format(bucket_name, file_name)

    dataset_id = 'load_from_gcs'
    table_id = 'table_sample'
    bq = bigquery.Client()
    dataset_ref = bq.dataset(dataset_id)

    # Set Load Config
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.write_disposition = 'WRITE_APPEND'

    # Load data
    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_id), job_config=job_config
    )
    print("Starting job {}".format(load_job.job_id))
    load_job.result()
    print("Job finished.")

import が必要な外部ライブラリは requirements.txt ファイルで保存します。

google-cloud-bigquery==1.24.0

Cloud Storage トリガーを指定した場合、Cloud Functions 関数の data パラメータとして Cloud Storage object が渡されます。

Cloud Functions の Cloud Storage トリガーには、以下の4種類のイベントがあります。

  • ファイナライズ / 作成:オブジェクトの作成または上書き
  • 削除:オブジェクトの削除
  • アーカイブ:バージョニング対応バケットの場合に、オブジェクトのアーカイブまたは削除
  • メタデータを更新:オブジェクトのメタデータ変更

今回は連携ファイルが GCS に配置されたことをトリガーに BigQuery へのロード処理を実行したいので、トリガーには google.storage.object.finalize を指定します。

main.pyrequirements.txt ファイルを配置してあるディレクトリで、以下のコマンド実行します。

deploy する関数名と --trigger-resource で指定する GCS バケット名は環境に合わせて変更する必要があります。

gcloud functions deploy load_data --runtime python37 --trigger-resource job-mikami-data-load --trigger-event google.storage.object.finalize
gcp_da_user@cloudshell:~/load (cm-da-mikami-yuki-258308)$ gcloud functions deploy load_data --runtime python37 --trigger-resource job-mikami-data-load --trigger-event google.storage.object.finalize
Allow unauthenticated invocations of new function [load_data]? (y/N)?
  y
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 256
entryPoint: load_data
eventTrigger:
  eventType: google.storage.object.finalize
  failurePolicy: {}
  resource: projects/_/buckets/job-mikami-data-load
  service: storage.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/load_data
runtime: python37
serviceAccountEmail: cm-da-mikami-yuki-258308@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/be62c39f-43fa-4d2d-8bed-4011eb493a96.zip?GoogleAccessId=service-797147019523@gcf-admin-robot.ia
m.gserviceaccount.com&Expires=1594008965&Signature=wVQjoPo7nrVwNAZIITMeZKcm%2Fu1hovjSlhX1gcDJ5JhACFHh6gx66H%2BOzVi%2BcDkMA%2B6G2pUSYT8wnm9PdRx6lmTJ4m27kpRtbPRBMfjM5oEALR1i0WpNxSV362yGjFLHgwddzaqfKgGXSUpU
MykkE9ZwpzFpHqT%2FvPPRNytUez6bDPrE%2Fkbmfi1B%2FK9AZQBRq5ZftakR1%2FqJjIOU5iZyfSSS%2FNegzNldff2uWwRGQknET2QBLEq6P0%2BOzSlEayS23krsrKSoNiLG3XUri8gbpNdaBg7iZ7exXaK4lUOz0li2%2FL14YSPYOuc10vD21knLU1hZwnjhAxiLD
6A%2FYLKj1Q%3D%3D
status: ACTIVE
timeout: 60s
updateTime: '2020-07-06T03:47:29.542Z'
versionId: '1'

Cloud Functions 関数が正常にデプロイされました。

サンプルデータ作成用の Cloud Functions 関数を準備

動作確認のため、サンプルデータを作成して GCS に CSV ファイルとして Put する関数を Cloud Functions にデプロイし、Cloud Scheduler でスケジュール実行してみます。

以下の Python コードと requirements.txt を準備しました。

import pandas as pd
from random import random
from datetime import datetime
from google.cloud import storage

def make_sample_data(data, context):
    now = datetime.now()
    df=pd.DataFrame({'value':[random() for _ in range(10)], 'create_time':[datetime.now() for _ in range(10)]})
    df = df[['value', "create_time"]]

    client = storage.Client()
    bucket_name = 'job-mikami-data-load'
    blob_name = 'input/sample_{}.csv'.format(datetime.now().strftime('%Y%m%d%H%M%S'))
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)

    blob.upload_from_string(data=df.to_csv(index=False), content_type='text/csv')
    print("Blob {} created.".format(blob_name))
google-cloud-storage==1.28.1
pandas==1.0.0

トリガーに Pub /Sub トピックを指定して Cloud Functions に関数をデプロイします。

gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud functions deploy make_sample_data --runtime python37 --trigger-topic make_sample_data
Allow unauthenticated invocations of new function [make_sample_data]?
(y/N)?  y
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 256
entryPoint: make_sample_data
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/cm-da-mikami-yuki-258308/topics/make_sample_data
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/make_sample_data
runtime: python37
serviceAccountEmail: cm-da-mikami-yuki-258308@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/bfbca735-4c04-40ff-8569-d65d7533703c.zip?GoogleAccessId=service-797147019523@gcf-admin-robot.ia
m.gserviceaccount.com&Expires=1594012319&Signature=UE5utvgBdIyRh88Xcn3zekP5Tpk%2B1jVdgLa%2F5aLh3mBTBCWrqhqKWJpLXW6khvtwnbhw3qzMMb2TqMNQZP1FOU7IP8XV6%2B368rj7f5tm7tF9GG9z4opC8gFVqNb%2BSvtk%2FDExG9yxpVjc%2
FXDUEJ8yty0FOmWTXmmLMeLZFv9Ha38xp%2FzL%2BLkRW6tLKxwcnAfRpOyhjytf0XaNWT9v15sLdbKflCJfQT5gxztu1Rx17%2Frm6E1EnoefEIQEAUitWL3Qz1VwzYZiM1uA7wThoQRKMdiBninGO4Dlc9U%2FNsKE5P57d8bspSmB6D1bz9gmnZhYcKvV5vUvlL0VQnA
Yhf1rAg%3D%3D
status: ACTIVE
timeout: 60s
updateTime: '2020-07-06T04:43:49.075Z'
versionId: '1'

正常にデプロイできました。

続いて Pub/Sub のサブスクリプションを作成し、Cloud Scheduler のジョブを作成します。

gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud pubsub subscriptions create gcf-make-sample-data --topic make_sample_data
Created subscription [projects/cm-da-mikami-yuki-258308/subscriptions/gcf-make-sample-data].
gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud scheduler jobs create pubsub make_sample_data --schedule "*/5 * * * *" --topic make_sample_data --message-body "sample"
name: projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/make_sample_data
pubsubTarget:
  data: c2FtcGxl
  topicName: projects/cm-da-mikami-yuki-258308/topics/make_sample_data
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 16
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: '*/5 * * * *'
state: ENABLED
timeZone: Etc/UTC
userUpdateTime: '2020-07-06T04:47:40Z'

これで、5分ごとにサンプルデータを連携バケットに Put する準備ができました。

実行結果を確認

5 分ほど待ってから GCS バケットを確認してみると、Cloud Scheduler のジョブが実行され、GCS にサンプルデータが出力されていることが確認できました。

BigQuery にも GCS のファイルデータが正常にロードされました。

約 1 時間後、期待通り 5 分ごとに複数のサンプルデータファイルが出力されていることを確認し、

ファイル作成のたびにちゃんとデータが BigQuery にロードされているかどうか確認してみます。

gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ bq query --nouse_legacy_sql \
> 'SELECT
>     create_time,
>     count
> FROM (
>     SELECT
>         FORMAT_TIME("%R", CAST(create_time as TIME)) as create_time,
>         count(*) as count
>     FROM
>         `cm-da-mikami-yuki-258308.load_from_gcs.table_load`
>     GROUP BY
>         FORMAT_TIME("%R", CAST(create_time as TIME))
>     )
> ORDER BY
>     create_time'
Waiting on bqjob_r6c8c13626ba9f6c1_0000017322bb387c_1 ... (0s) Current status: DONE   
+-------------+-------+
| create_time | count |
+-------------+-------+
| 04:50       |    10 |
| 04:55       |    10 |
| 05:00       |    10 |
| 05:05       |    10 |
| 05:10       |    10 |
| 05:15       |    10 |
| 05:20       |    10 |
| 05:25       |    10 |
| 05:30       |    10 |
| 05:35       |    10 |
| 05:40       |    10 |
| 05:45       |    10 |
| 05:50       |    10 |
| 05:55       |    10 |
| 06:00       |    10 |
| 06:05       |    10 |
+-------------+-------+

期待通り、GCS にファイルオブジェクトが作成タイミングでイベント駆動で Cloud Functions 関数が実行され、全てのサンプルデータが BigQuery にロードされたことが確認できました。

まとめ(所感)

GCP の Cloud Functions を使えば、AWS の Lambda のように、ファイルストレージにオブジェクトが作成されたタイミングでイベント駆動で必要な処理を実行することができます。

GCS から BigQuery にデータロードする関数を Cloud Functions にデプロイすることで、サーバー環境やツールなどの大掛かりな準備なしで、データファイルが GCS に配置されたタイミングで即座に連携データを BigQuery にロードするジョブを手軽に実装することができました。

動作確認やデプロイなどに Google Cloud SDK や クライアントライブラリを使いたい場合、Cloud Shell を使えば実行環境の準備をする必要もないので、ジョブ実装のハードルはさらに下がります。

Cloud Functions の関数には、最大 9 分のタイムアウト制限や、最大 2,048 MBのメモリ制限があるので、連携データが大量な場合などには考慮が必要かと思いますが、環境構築などの手間なく低コストで使用できる Cloud Functions は非常に使いやすいと思いました。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.